Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-8128

Schema Merging Broken: Dataframe Fails to Recognize Column in Schema

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.3.0, 1.3.1, 1.4.0
    • None
    • PySpark, Spark Core
    • None

    Description

      I'm loading a folder of parquet files with about 600 parquet files and loading it into one dataframe so schema merging is involved. There is some bug with the schema merging that you print the schema and it shows all the attributes. However when you run a query and filter on that attribute is errors saying it's not in the schema. The query is incorrectly going to one of the parquet files that does not have that attribute.

      sdf = sql_context.parquet('/parquet/big_data_folder')
      sdf.printSchema()
      root
      |-- _id: string (nullable = true)
      |-- addedOn: string (nullable = true)
      |-- attachment: string (nullable = true)
      .......
      |-- items: array (nullable = true)
      | |-- element: struct (containsNull = true)
      | | |-- _id: string (nullable = true)
      | | |-- addedOn: string (nullable = true)
      | | |-- authorId: string (nullable = true)
      | | |-- mediaProcessingState: long (nullable = true)
      |-- mediaProcessingState: long (nullable = true)
      |-- title: string (nullable = true)
      |-- key: string (nullable = true)

      sdf.filter(sdf.mediaProcessingState == 3).count()

      causes this exception

      Py4JJavaError: An error occurred while calling o67.count.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1106 in stage 4.0 failed 30 times, most recent failure: Lost task 1106.29 in stage 4.0 (TID 70565, XXXXXXXXXXXXXXX): java.lang.IllegalArgumentException: Column [mediaProcessingState] was not found in schema!
      at parquet.Preconditions.checkArgument(Preconditions.java:47)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41)
      at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46)
      at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41)
      at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22)
      at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108)
      at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28)
      at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158)
      at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
      at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
      at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
      at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
      at org.apache.spark.scheduler.Task.run(Task.scala:64)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

      You also get the same error if you register it as a temp table and try to execute the same sql query.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              brdwrd Brad Willard
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: