Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35287

RemoveRedundantProjects removes non-redundant projects

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.1
    • 3.2.0, 3.1.3
    • SQL
    • None

    Description

      RemoveRedundantProjects erroneously removes non-redundant projects which are required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There is a code for this case, but it only looks at the child. The bug occurs when DataSourceV2ScanExec is not a child of the project, but a descendant. The method isRedundant in RemoveRedundantProjects should be updated to account for descendants too.

      The original scenario requires Iceberg to reproduce the issue. In theory, it should be able to reproduce the bug with Spark SQL only, and someone more knowledgeable with Spark SQL should be able to make such a scenario. The following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): 

      import scala.collection.JavaConverters._
      
      import org.apache.iceberg.{PartitionSpec, TableProperties}
      import org.apache.iceberg.hadoop.HadoopTables
      import org.apache.iceberg.spark.SparkSchemaUtil
      import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
      import org.apache.spark.sql.internal.SQLConf
      
      class RemoveRedundantProjectsTest extends QueryTest {
        override val spark: SparkSession = SparkSession
          .builder()
          .master("local[4]")
          .config("spark.driver.bindAddress", "127.0.0.1")
          .appName(suiteName)
          .getOrCreate()
        test("RemoveRedundantProjects removes non-redundant projects") {
          withSQLConf(
            SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
            SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
            SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
            withTempDir { dir =>
              val path = dir.getCanonicalPath
              val data = spark.range(3).toDF
              val table = new HadoopTables().create(
                SparkSchemaUtil.convert(data.schema),
                PartitionSpec.unpartitioned(),
                Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
                path)
              data.write.format("iceberg").mode("overwrite").save(path)
              table.refresh()
      
              val df = spark.read.format("iceberg").load(path)
              val dfX = df.as("x")
              val dfY = df.as("y")
              val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
              join.explain("extended")
              assert(join.count() == 2)
            }
          }
        }
      }
      

      Stack trace:

      [info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
      [info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
      [info]  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
      [info]  at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
      [info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
      [info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
      [info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]  at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
      [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      [info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
      [info]  at org.apache.spark.scheduler.Task.run(Task.scala:131)
      [info]  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
      [info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
      [info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
      [info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      [info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      [info]  at java.lang.Thread.run(Thread.java:748)
      [info]
      [info] Driver stacktrace:
      [info]   at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
      [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
      [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
      [info]   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
      [info]   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
      [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
      [info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
      [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
      [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
      [info]   at scala.Option.foreach(Option.scala:407)
      [info]   ...
      [info]   Cause: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
      [info]   at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
      [info]   at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
      [info]   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
      [info]   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
      [info]   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      [info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]   at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
      [info]   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
      [info]   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
      [info]   ...
      

      Attachments

        Activity

          People

            sarutak Kousuke Saruta
            chungmin Chungmin
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: