Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6982

Data Frame and Spark SQL should allow filtering on key portion of incremental parquet files

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Cannot Reproduce
    • 1.3.0
    • None
    • Spark Core, SQL

    Description

      I'm working with a 2.4 billion dataset. I just converted it to use the incremental schema features of parquet added in 1.3.0 where you save incremental files with key=X.

      I'm currently saving files where the key is a timestamp like key=2015-01-01. If I run a query, the key comes back as an attributes in the row. It would be amazing to be able to do comparisons and filters on the key attribute to do efficient queries between points in time and just skip the partitions of data outside of a key range.

      df = sql_context.parquetFile('/super_large_dataset_over time')
      df.filter(df.key >= '2015-3-24').filter(df.key < '2015-04-01').count()

      That job could then skip large portions of the dataset very quickly even if the entire parquet file contains years of data.

      Currently that will throw an error because key is not part of the parquet schema even though it's returned in the rows.

      However it does strangely work with the in clause which is my current work around
      df.where('key in ("2015-04-02", "2015-04-03")').count()

      Job aborted due to stage failure: Task 122 in stage 6.0 failed 100 times, most recent failure: Lost task 122.99 in stage 6.0 (TID 39498, ip-XXXXXXXXXXXX): java.lang.IllegalArgumentException: Column [key] was not found in schema!
      at parquet.Preconditions.checkArgument(Preconditions.java:47)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41)
      at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162)
      at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46)
      at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41)
      at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22)
      at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108)
      at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28)
      at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158)
      at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
      at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
      at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
      at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.NewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD.compute(NewHadoopRDD.scala:244)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
      at org.apache.spark.scheduler.Task.run(Task.scala:64)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

      Driver stacktrace:

      Attachments

        Activity

          People

            Unassigned Unassigned
            brdwrd Brad Willard
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: