Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33314

Avro reader drops rows

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.1.0
    • 3.1.0
    • SQL

    Description

      Under certain circumstances, the V1 Avro reader drops rows. For example:

      scala> val df = spark.range(0, 25).toDF("index")
      df: org.apache.spark.sql.DataFrame = [index: bigint]
      
      scala> df.write.mode("overwrite").format("avro").save("index_avro")
      
      scala> val loaded = spark.read.format("avro").load("index_avro")
      loaded: org.apache.spark.sql.DataFrame = [index: bigint]
      
      scala> loaded.collect.size
      res1: Int = 25
      
      scala> loaded.orderBy("index").collect.size
      res2: Int = 17   <== expected 25
      
      scala> loaded.orderBy("index").write.mode("overwrite").format("parquet").save("index_as_parquet")
      
      scala> spark.read.parquet("index_as_parquet").count
      res4: Long = 17
      
      scala>
      

      SPARK-32346 slightly refactored the AvroFileFormat and AvroPartitionReaderFactory to use a new iterator-like trait called AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record and stores the deserialized row for the next call to RowReader#nextRow. Unfortunately, sometimes hasNextRow is called twice before nextRow is called, resulting in a lost row (see BypassMergeSortShuffleWriter#write, which calls records.hasNext once before calling it again here).

      RowReader consumes the Avro record in hasNextRow, rather than nextRow, because AvroDeserializer#deserialize potentially filters out the record.

      Two possible fixes that I thought of:

      1) keep state in RowReader such that multiple calls to RowReader#hasNextRow with no intervening call to RowReader#nextRow avoids consuming more than 1 Avro record. This requires no changes to any code that extends RowReader, just RowReader itself.
      2) Move record consumption to RowReader#nextRow (such that RowReader#nextRow could potentially return None) and wrap any iterator that extends RowReader with a new iterator created by flatMap. This last iterator will filter out the Nones and extract rows from the Somes. This requires changes to AvroFileFormat and AvroPartitionReaderFactory as well as RowReader.

      The first one seems simplest and most straightfoward, and doesn't require changes to AvroFileFormat and AvroPartitionReaderFactory, only to AvroUtils#RowReader. So I propose this.

      Attachments

        Issue Links

          Activity

            People

              bersprockets Bruce Robbins
              bersprockets Bruce Robbins
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: