Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35700

spark.sql.orc.filterPushdown not working with Spark 3.1.1 for tables with varchar data type

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.1
    • 3.2.0, 3.1.3
    • Kubernetes, PySpark, Spark Core
    • None
    • Spark 3.1.1 on K8S

    Description

      We are not able to upgrade to Spark 3.1.1 from Spark 2.4.x as the join on varchar column is failing which is unexpected and works on Spark 3.0.0.  We are trying to run it on Spark 3.1.1 (MR 3.2) on K8s 

      Below is my use case:

      Tables are external hive table and files are stored as ORC. We do have varchar column and when we are trying to perform join on varchar column we are getting the exception.

      As I understand Spark 3.1.1 have introduced varchar data type but seems its not well tested with ORC and does not have backward compatibility. I have even tried with below config without luck

      spark.sql.legacy.charVarcharAsString: "true"

      We are not getting the error when spark.sql.orc.filterPushdown=false

      Below is the code: Here col1 is of type varchar(32) in hive

      df = spark.sql("select col1, col2 from table1 a inner join table2 on b (a.col1=b.col1 and a.col2 > b.col2 )") 
      df.write.format("orc").option("compression", "zlib").mode("Append").save("<s3_path>")
      

      Below is the error:

       

      Job aborted due to stage failure: Task 43 in stage 5.0 failed 4 times, most recent failure: Lost task 43.3 in stage 5.0 (TID 524) (10.219.36.64 executor 5): java.lang.UnsupportedOperationException: DataType: varchar(32)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getPredicateLeafType(OrcFilters.scala:150)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getType$1(OrcFilters.scala:222)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFilters$.buildLeafSearchArgument(OrcFilters.scala:266)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFiltersHelper$1(OrcFilters.scala:132)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFilters$.$anonfun$convertibleFilters$4(OrcFilters.scala:135)
      	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
      	at scala.collection.immutable.List.foreach(List.scala:392)
      	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
      	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
      	at scala.collection.immutable.List.flatMap(List.scala:355)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFilters(OrcFilters.scala:134)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFilters$.createFilter(OrcFilters.scala:73)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4(OrcFileFormat.scala:189)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4$adapted(OrcFileFormat.scala:188)
      	at scala.Option.foreach(Option.scala:407)
      	at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$1(OrcFileFormat.scala:188)
      	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
      	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
      	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
      	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.columnartorow_nextBatch_0$(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
      	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
      	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
      	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
      	at org.apache.spark.scheduler.Task.run(Task.scala:131)
      	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
      	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
      	at java.base/java.lang.Thread.run(Unknown Source)
      
      Driver stacktrace:3
      

       

      I can see there is no mapping of varchar in OrcFilters.scala:150

      https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala#L142

       

       

      Attachments

        Issue Links

          Activity

            People

              Qin Yao Kent Yao
              arghya18 Arghya Saha
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: